package com.derbysoft.nuke.kafka.manager;

import com.derbysoft.nuke.kafka.manager.domain.model.*;
import com.derbysoft.nuke.kafka.manager.domain.service.ClusterService;
import com.derbysoft.nuke.kafka.manager.dto.*;
import com.derbysoft.nuke.kafka.manager.infrastructure.jmx.JMXService;
import com.derbysoft.nuke.kafka.manager.infrastructure.kafka.KafkaAdminClient;
import com.derbysoft.nuke.kafka.manager.infrastructure.kafka.KafkaClient;
import com.derbysoft.nuke.kafka.manager.infrastructure.kafka.model.Message;
import com.derbysoft.nuke.kafka.manager.infrastructure.kafka.model.OffsetTimestamp;
import com.derbysoft.nuke.kafka.manager.infrastructure.zookeeper.ZookeeperClient;
import com.derbysoft.nuke.kafka.manager.infrastructure.zookeeper.ZookeeperService;
import com.derbysoft.nuke.kafka.manager.infrastructure.zookeeper.ZookeeperServiceImpl;
import com.derbysoft.nuke.kafka.manager.infrastructure.zookeeper.data.BrokerInfo;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.Node;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.time.Duration;
import java.util.*;
import java.util.stream.Collectors;

@Service
public class KafkaManagerServiceImpl implements KafkaManagerService {

    @Autowired
    private KafkaClient kafkaClient;

    @Autowired
    private ZookeeperClient zookeeperClient;

    @Autowired
    private JMXService jmxService;

    @Autowired
    private ObjectMapper objectMapper;

    @Autowired
    private ClusterService clusterService;

    private LoadingCache<String, String> bootstrapServersCache = CacheBuilder.newBuilder()
            .expireAfterWrite(Duration.ofMinutes(1))
            .build(new CacheLoader<String, String>() {
                @Override
                public String load(String key) throws Exception {
                    return getBrokers(key).stream()
                            .map(Broker::getBootstrapServer)
                            .collect(Collectors.joining(";"));
                }
            });

    private List<Broker> getBrokers(String connectString) {
        ZookeeperService zookeeperService = zookeeperService(connectString);
        return zookeeperService.getBrokers().stream()
                .map(BrokerInfo::toBroker)
                .collect(Collectors.toList());
    }

    private ZookeeperService zookeeperService(String connectString) {
        CuratorFramework zookeeperClient = this.zookeeperClient.getZookeeperClient(connectString);
        return new ZookeeperServiceImpl(zookeeperClient, objectMapper);
    }

    public String getBootstrapServers(String connectString) {
        return bootstrapServersCache.getUnchecked(connectString);
    }

    private String getBootstrapServers(Cluster cluster) {
        return getBootstrapServers(cluster.getZookeeperServers());
    }


    @Override
    public Topic getTopic(Cluster cluster, String topicName) {
        return describeTopics(cluster, Sets.newHashSet(topicName)).get(0);
    }

    @Override
    public List<Topic> getTopics(Cluster cluster) {
        try {
            AdminClient adminClient = kafkaClient.getAdminClient(getBootstrapServers(cluster));
            ListTopicsResult listTopicsResult = adminClient.listTopics(new ListTopicsOptions().listInternal(true));
            Set<String> topicNames = listTopicsResult.names().get();
            return describeTopics(cluster, topicNames);
        } catch (Exception e) {
            throw new IllegalStateException("List topics of cluster[" + cluster.getZookeeperServers() + "] failed", e);
        }
    }

    @Override
    public TopicSummary getTopicSummary(Cluster cluster, String topicName) {
        return toTopicSummary(getTopic(cluster, topicName));
    }

    @Override
    public List<TopicPartitionSummary> getTopicPartitionSummaries(Cluster cluster, String topicName) {
        KafkaAdminClient kafkaAdminClient = kafkaClient.getKafkaAdminClient(getBootstrapServers(cluster));

        Map<Integer, OffsetTimestamp> startOffsetTimestamps = kafkaAdminClient.getStartOffsetTimestamps(topicName);
        Map<Integer, OffsetTimestamp> endOffsetTimestamps = kafkaAdminClient.getEndOffsetTimestamps(topicName);
        Topic topic = getTopic(cluster, topicName);
        return topic.getPartitions().stream()
                .map(tp -> {
                    OffsetTimestamp startOffsetTimestamp = startOffsetTimestamps.get(tp.getPartition());
                    OffsetTimestamp endOffsetTimestamp = endOffsetTimestamps.get(tp.getPartition());

                    return TopicPartitionSummary.builder()
                            .topicPartition(tp)
                            .start(startOffsetTimestamp)
                            .end(endOffsetTimestamp)
                            .build();
                })
                .collect(Collectors.toList());
    }

    @Override
    public TopicMessageSeekResponse seekMessages(Cluster cluster, String topicName, TopicMessageSeekRequest topicMessageSeekRequest) {
        KafkaAdminClient kafkaAdminClient = kafkaClient.getKafkaAdminClient(getBootstrapServers(cluster));

        List<PartitionSeekInfo> partitionSeekInfos = topicMessageSeekRequest.getPartitionOffsets();
        Map<org.apache.kafka.common.TopicPartition, Long> topicPartitionOffsets = partitionSeekInfos.stream()
                .collect(Collectors.toMap(
                        psi -> new org.apache.kafka.common.TopicPartition(topicName, psi.getPartition()),
                        PartitionSeekInfo::getSeekOffset
                ));
        List<Message<String, String>> messages = kafkaAdminClient.fetchMessages(topicPartitionOffsets, topicMessageSeekRequest.getRecordsPerPartition());

        Map<Integer, Long> lastOffsets = getLastOffsets(partitionSeekInfos, messages);
        return TopicMessageSeekResponse.builder()
                .messages(messages)
                .lastOffsets(lastOffsets)
                .build();
    }

    private Map<Integer, Long> getLastOffsets(List<PartitionSeekInfo> partitionSeekInfos, List<Message<String, String>> messages) {
        Map<Integer, Long> offsets = partitionSeekInfos.stream().collect(Collectors.toMap(psi -> psi.getPartition(), psi -> psi.getSeekOffset()));
        if (messages.isEmpty()) return offsets;
        Map<Integer, List<Message<String, String>>> groupByPartitions = messages.stream()
                .collect(Collectors.groupingBy(m -> m.getPartition(), Collectors.toList()));
        for (Map.Entry<Integer, List<Message<String, String>>> entry : groupByPartitions.entrySet()) {
            Integer partition = entry.getKey();
            List<Message<String, String>> partitionMessages = entry.getValue();
            if (offsets.containsKey(partition)) {
                Long newSeekOffset = partitionMessages.stream().mapToLong(Message::getOffset).max().getAsLong();
                offsets.put(partition, newSeekOffset);
            }
        }
        return offsets;
    }

    @Override
    public List<TopicSummary> listTopicSummaries(Cluster cluster) {
        List<Topic> topics = getTopics(cluster);
        return topics.stream()
                .map(this::toTopicSummary)
                .collect(Collectors.toList());
    }


    @Override
    public List<Broker> getBrokers(Cluster cluster) {
        return getBrokers(cluster.getZookeeperServers());
    }

    @Override
    public List<BrokerMetrics> getBrokersMetrics(Cluster cluster) {
        return getBrokers(cluster).stream().map(b -> this.getBrokerMetrics(b)).collect(Collectors.toList());
    }

    private BrokerMetrics getBrokerMetrics(Broker broker) {
        Map<String, Object> metrics = Maps.newHashMap();
        metrics.put("OS", this.jmxService.getOSMetrics(broker));
        metrics.put("SocketServerMetrics", this.jmxService.getSocketServerMetrics(broker));
        metrics.put("BrokerTopicMetrics", this.jmxService.getBrokerTopicMetrics(broker));
        return BrokerMetrics.builder().broker(broker).metrics(metrics).build();
    }

    private TopicSummary toTopicSummary(Topic topic) {
        return TopicSummary.builder()
                .topicName(topic.getName())
                .partitions(topic.getPartitions().size())
                .build();
    }

    private List<Topic> describeTopics(Cluster cluster, Set<String> topicNames) {
        try {
            AdminClient adminClient = kafkaClient.getAdminClient(getBootstrapServers(cluster));
            DescribeClusterResult describeClusterResult = adminClient.describeCluster();
            Collection<Node> brokers = describeClusterResult.nodes().get();

            DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topicNames);
            return convert(cluster, brokers, describeTopicsResult.all().get().values());
        } catch (Exception e) {
            throw new IllegalStateException("Describe topics[" + StringUtils.join(topicNames, ", ") + "] failed", e);
        }
    }

    public List<Topic> convert(Cluster cluster, Collection<Node> brokers, Collection<TopicDescription> topicDescriptions) {
        List<Topic> topics = new ArrayList<>();
        if (topicDescriptions != null) {
            topicDescriptions.forEach(topicDescription -> {
                List<TopicPartition> partitions = new ArrayList<>();
                topicDescription.partitions().forEach(topicPartitionInfo -> {
                    TopicPartition topicPartition = new TopicPartition();
                    topicPartition.setPartition(topicPartitionInfo.partition());
                    topicPartition.setLeader(toNode(topicPartitionInfo.leader()));
                    topicPartition.setIsr(toNodes(topicPartitionInfo.isr()));
                    topicPartition.setReplicas(toNodes(topicPartitionInfo.replicas()));
                    partitions.add(topicPartition);
                });

                Topic topic = new Topic();
                topic.setName(topicDescription.name());
                topic.setInternal(topicDescription.isInternal());
                topic.setPartitions(partitions);
                topic.setBrokers(toNodes(brokers));
                topic.setCluster(cluster);

                topics.add(topic);
            });
        }
        return topics;
    }

    private List<com.derbysoft.nuke.kafka.manager.domain.model.Node> toNodes(Collection<Node> nodes) {
        List<com.derbysoft.nuke.kafka.manager.domain.model.Node> nodeList = new ArrayList<>();
        nodes.forEach(n -> nodeList.add(toNode(n)));
        return nodeList;
    }

    private com.derbysoft.nuke.kafka.manager.domain.model.Node toNode(Node node) {
        if (node == null) return null;
        com.derbysoft.nuke.kafka.manager.domain.model.Node n = new com.derbysoft.nuke.kafka.manager.domain.model.Node();
        n.setId(node.id());
        n.setHost(node.host());
        n.setPort(node.port());
        n.setRack(node.rack());
        return n;
    }

    public void setKafkaClient(KafkaClient kafkaClient) {
        this.kafkaClient = kafkaClient;
    }
}
